Implement CHANGES table function for changelog reads#36869
Conversation
antiguru
left a comment
There was a problem hiding this comment.
Thorough review focused on correctness, dedup/refactoring, docs, and test gaps. This is a large, carefully-built PR — excellent design doc, focused regression tests for the two optimizer panics it flushed out, and unusually complete fan-out across the transform analyses. Findings below, ordered by severity.
Correctness
1. (High confidence) mz_now() in a one-off query body containing CHANGES resolves to the changelog start, not the query time
src/adapter/src/optimize/peek.rs (~L360–386). The dataflow as_of is set to the query timestamp and then overridden down to the changelog start:
df_desc.set_as_of(timestamp_ctx.antichain()); // as_of = query time
if let Some(changelog_as_of) = changelog_as_of {
df_desc.set_as_of(Antichain::from_elem(changelog_as_of)); // as_of = start (earlier!)
}
let as_of = df_desc.as_of....into_option()...; // = start
let style = ExprPrepOneShot { logical_time: EvalTime::Time(as_of), .. }; // mz_now() -> startBefore this PR as_of == timestamp_ctx.timestamp() invariably, so logical_time was the query logical time. The override breaks that invariant: any mz_now() elsewhere in the same one-off query now folds to the changelog start, while the peek still samples at the query timestamp. So SELECT mz_now() FROM CHANGES(t AS OF 5) LIMIT 1 would return 5, and temporal filters in the body would be evaluated at the start. The changelog's own bound is fine (resolved earlier from timestamp_ctx.timestamp()). Fix: derive logical_time from timestamp_ctx.timestamp(), not from the post-override as_of. No test exercises this combination.
2. (Medium) Multiple CHANGES reads with distinct bounds in one query collapse to the minimum as_of
peek.rs computes changelog_as_of = min(all starts) and pins the single dataflow as_of to it; every import is flagged ChangelogMode::OneShot, which carries no per-import start. render.rs's OneShot path then reads each flagged source at that one dataflow.as_of. So ... FROM CHANGES(c AS OF 100), CHANGES(d AS OF 50) reads c from 50 — spurious rows with mz_timestamp ∈ [50,100). Same for sliding bounds with different lags. Latent semantic bug, untested; at minimum note it in the design doc's "sharp edges," ideally give OneShot a per-import start like Maintained { start }.
Documentation inaccuracies
3. "Identical to SUBSCRIBE" overstates the equivalence
Design doc ("identical to SUBSCRIBE") and comments in relation.rs / query.rs ("matching SUBSCRIBE's diff format/shape"). Comparing to src/sql/src/plan/statement/dml.rs (~L1547–1566), the real SUBSCRIBE output differs three ways: column order (SUBSCRIBE prepends mz_timestamp, mz_diff; CHANGES appends), mz_timestamp type (SUBSCRIBE Numeric{scale 0} vs CHANGES MzTimestamp), and mz_diff nullability (SUBSCRIBE nullable(true) vs CHANGES nullable(false)). The CHANGES choices are reasonable, but say "the same columns, trailing, with mz_timestamp typed as mz_timestamp" rather than "identical."
4. pack_changelog_row doc comment is wrong for the maintained path
render.rs (~L1844): "time has already been advanced to max(time, as_of) by persist_source…". True only for OneShot. In Maintained, the source is read at start below as_of with the snapshot excluded, so the time written into mz_timestamp can be less than as_of — that's the point. Since this helper serves both paths, the comment is misleading exactly where it matters.
5. Gating error message names the wrong contexts
PlanError::ChangesRequiresSlidingBound displays "CHANGES in a materialized view or index requires a sliding bound", but plan_changes routes any maintained non-sliding lifetime here — including plain CREATE VIEW (tested at changes.slt:100), Subscribe, and Source (all is_maintained()). A CREATE VIEW user sees a message about MVs/indexes; and per the design's own lifetime matrix SUBSCRIBE is supposed to allow a fixed bound. Generalize the wording ("a maintained object") or special-case the view message.
Minor / pedantic
- Double-planning the bound.
plan_changesplans the bound viaplan_changes_bound, then re-plans it viaplan_as_of_or_up_tofor fixed bounds purely to validate, discarding the result. Two passes over one expr; comment notes it's validation-only. - Subquery alias dropped.
CHANGES((SELECT a AS x FROM t) AS OF …)reduces toGet(t); output column isa, notx(the rename lives only in the discarded scope). - Parser grab not feature-gated.
parse_table_factortreatsCHANGES (specially regardless ofenable_changes_table_function; a relation namedchangesimmediately followed by(would fail to parse even with the flag off. Low risk, but broader than the gated feature. - Zero-lag sliding bound.
AS OF AT LEAST mz_now()(no subtraction) passescontains_temporal()→ in an MV yieldswindow = 0and filtermz_now() < mz_timestamp. Degenerate, accepted, untested — reject or document. - Input column named
mz_timestamp/mz_diffyields duplicate column names in theRelationDesc(no dedup). Same latent issue asSUBSCRIBE, so not a regression, but unvalidated/untested.
Test gaps
- No
EXPLAINtest for the newChangesexplain-text in either the HIR (sql/src/plan/explain/text.rs) or MIR (expr/src/explain/text.rs) formatter. AnEXPLAIN SELECT … FROM CHANGES(…)slt case would lock inChanges <id> as_of=…/as_of_at_least=…. - No test for
mz_now()in the body of a one-offCHANGESquery (would surface #1). - No test for multiple
CHANGESwith distinct bounds in one query (#2). - No error-stream round-trip coverage for the reinterpretation (
errs.delay(...)inrender.rs).
Done well
- Every transform analysis gets an explicit
Changesarm rather than relying on a catch-all, and the two real panics this surfaced (NonNegativeleaf at post-order index 0; missingMreDiffarm) both have focused regression tests. - The scalar
boundis correctly excluded from the scalar visitors (relation.rstry_visit_scalars_mut1et al.), so prep never resolvesmz_now()inside the leaf prematurely — consistent with the opaque-leaf design and per-path resolution. - The
as_of_selectionsoft-constraint ordering (apply after the hard downstream constraint so a conflict degrades to the advisory fallback rather than overshooting) is subtle and well-tested (changelog_storage_constraints{,_conflict}). - Creation-time-only window-cap enforcement, with the explicit rationale that bootstrap re-optimization must not wedge on a lowered cap, is a good call and documented in three places.
Generated by Claude Code
|
Review findings addressed in a4fc08b. #1 + #2 (one-off #3 — equivalence claims corrected in the design doc and #4 — #5 — message generalized to "CHANGES with a fixed bound is only supported in one-off SELECT queries", which stays accurate for views/sources and remains correct until SUBSCRIBE (where fixed bounds become allowed) is wired up. #9 — zero-lag sliding bound kept (degenerate but coherent: empty window, only new changes enter) and locked in with an slt test. #7/#10 recorded as sharp edges in the design doc. EXPLAIN output locked in via slt for fixed and sliding bounds. Not changed: #8 — the parser runs before the catalog exists, so it cannot consult feature flags; this matches every other gated syntax. A relation named |
80ac1e3 to
c19b372
Compare
Foundation for the CHANGES(<collection>, <as_of>) table function (reads a collection as an append-only changelog). This lands the verified, independently testable pieces; the IR/execution wiring follows. - Compute reinterpretation (highest-risk core): pack_changelog_row + an append-only changelog transform in mz_compute::render, gated by the new SourceImport::read_as_changelog flag. Consolidates before packing the diff into a column so the snapshot collapses to as_of. Unit-tested. - SourceImport carries read_as_changelog through compute-types/compute-client; DataflowDescription::set_source_read_as_changelog lets the coordinator mark a changelog source import. import_source gains the flag (default false). - Parser/AST: non-reserved CHANGES keyword + TableFactor::Changes, parsed in parse_table_factor, with display + roundtrip tests. Name resolver handles it. - Planner: plan_changes gates on the feature flag, requires a persist-backed object (table/source/materialized view), and evaluates the constant as_of; execution wiring is bail_unsupported for now. - Feature flag enable_changes_table_function. https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Adds the dedicated, optimizer-opaque MirRelationExpr::Changes node and wires it
through the IR pipeline (the SQL surface now plans through to LIR):
- mz-expr: MirRelationExpr::Changes { id, typ, as_of } leaf; handled in typ,
arity, keys, visitors, depends_on, and EXPLAIN. No new proto/Arbitrary.
- mz-transform: Changes treated as an opaque barrier leaf across analyses and
transforms (registers nothing in gets maps). The typechecker accepts its
extended (input + mz_timestamp + mz_diff) type as-is without cross-checking
against the catalog type of id.
- mz-compute-types: LIR lowering lowers Changes to a Get of the (changelog-
marked) source import; rendering already appends the columns and emits +1.
- mz-sql: HirRelationExpr::Changes + HIR->MIR lowering; plan_changes now builds
the extended RelationDesc and produces the node (no longer bails).
Remaining: coordinator wiring (mark the source import read_as_changelog with the
extended type, set the dataflow as_of to the changelog as_of, take the read
hold, and peek at the latest time).
https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
In the peek optimizer's finalize, scan the dataflow for MirRelationExpr::Changes nodes, mark the corresponding source imports read_as_changelog, and pin the dataflow as_of to the earliest changelog start (so the snapshot is taken there and later changes replay as appends; the peek still happens at the determined timestamp). The dataflow's implied read hold pins the input's since at the changelog as_of for the query's lifetime. This completes the one-off SELECT ... FROM CHANGES(coll, as_of) path. https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
- test/sqllogictest/changes.slt: deterministic plan-time coverage (feature gating; rejection of views, CTEs, unmaterializable/negative as_of). - test/testdrive/changes.td: data round-trip. Captures a valid as_of via mz_now(), then asserts the snapshot collapses to as_of and later changes append (insert as +1, delete as a row with mz_diff = -1), and that CHANGES composes with GROUP BY/aggregation. Asserts data + mz_diff only, since the per-update timestamp is non-deterministic. https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Replace the comma-separated `CHANGES(name, expr)` argument with an
`AS OF [AT LEAST] <bound>` clause that mirrors SUBSCRIBE's surface. The
bound now varies along two orthogonal axes:
* strict (`AS OF`) vs. advisory (`AS OF AT LEAST`), carried by AsOf
* fixed (constant) vs. sliding (`mz_now()`-relative), detected via
contains_temporal()
Gate by lifetime: a durable maintained object (materialized view,
index) requires a sliding bound, since a fixed lower bound would hold
the input's `since` open indefinitely. A sliding bound parses and
passes gating but its dataflow wiring (output temporal filter + lagging
read policy) is not yet implemented, so it currently bail_unsupported.
The fixed-bound one-off SELECT path is unchanged.
Update the design doc with the two-axis model, the pruned
lifetime x bound matrix, and the resolved parameter-keyword question.
Carry the changelog lower bound as an mz_timestamp-typed MirScalarExpr on the Changes node instead of a pre-folded Timestamp, and evaluate it in the peek coordinator at the query time (resolving mz_now()). A fixed bound folds to a constant; a sliding mz_now()-relative bound yields as_of = query_time - lag, so a one-off SELECT reads the window [query_time - lag, query_time] with no output temporal filter needed (the peek time is the upper edge). Gating: only one-off SELECTs are wired for execution; durable maintained objects with a fixed bound are rejected (indefinite hold), and other non-SELECT uses bail as unsupported. The maintained sliding case (lagging read policy) remains a follow-up. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The CHANGES relation argument can now be either a collection named directly or
a parenthesized subquery, mirroring SUBSCRIBE's surface. The AST gains a
ChangesRelation { Name, Query } enum; the planner resolves either form to the
single persist-backed collection it reads. A subquery must reduce to a bare
read (identity projection / empty map over a global Get) of a table, source, or
materialized view; anything that filters or transforms is the deferred
arbitrary-expression case and is rejected as unsupported. Unlike the name form,
a subquery transparently reads through a non-materialized view (which inlines)
to its underlying persist collection.
https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Thread the strict/advisory distinction (AS OF vs AS OF AT LEAST) onto the Changes IR node, and the inputs' read frontier (determination.since) into the peek optimizer. When resolving the changelog as_of at the query time, an advisory bound is clamped up to `since` (aging in to the earliest available history rather than erroring), while a strict bound is pinned as written (if it precedes `since`, dataflow creation errors, surfacing the unavailable history). The peek's existing read holds already pin `since` for the query duration, so no new hold is needed; the dataflow as_of must clear `since` anyway, making the clamp both correct and required. Non-peek callers (frontend_peek SELECT path) thread the same frontier; explain-only paths (insights) pass a minimal since. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The Changes variant was missing an arm in MreDiff (the structured-diff backing MirRelationExpr::eq), so two identical Changes nodes compared unequal via eq while cmp reported them equal, tripping the eq/cmp consistency soft-assert. In CI's debug build this panicked the optimizer (its fixpoint compares an expression to its prior form), so every CHANGES execution failed. Add the Changes arm and a regression test asserting eq agrees with cmp. Also: - Exclude changes.slt from --auto-index-selects: CHANGES is rejected in an index/materialized-view context, so wrapping its SELECTs in an indexed view is intentionally inconsistent. - Shorten the durable-lifetime gating error to a one-line message and move the mz_now()-relative suggestion into a hint (new PlanError::ChangesRequiresSlidingBound). https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
- Design doc: add "Maintained materialized views: sliding execution" describing the intended (not-yet-implemented) MV path — gating, a lagging read policy on the input, a continuous output temporal filter, and bounded-restart semantics — and why it is deferred until it can be runtime-tested as a unit. - testdrive: add a CHANGES-over-materialized-view round-trip, covering a non-table persist-backed input and changes flowing through the MV. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
…s leaf
The NonNegative analysis used a `_ => results[index - 1]` catch-all for
single-input operators, with explicit arms only for the `Constant` and `Get`
leaves. The `Changes` leaf was never given an arm, so it fell into the
catch-all: for a `Changes` node at post-order index 0 (e.g. `SELECT a, mz_diff
FROM CHANGES(t)`, which optimizes to `Project(Changes)`), that computes
`results[0 - 1]` = `results[usize::MAX]` and panics the optimizer with an
out-of-bounds index ("the len is 2 but the index is 18446744073709551615").
A `count(*)` plan hit the same underflow with a larger node count.
Add an explicit `Changes => false` leaf arm: a changelog read is conservatively
treated as possibly-retracting (the maintained sliding case ages rows out of
the window), and, like Constant/Get, never reaches the indexing catch-all.
Other Changes arms (eq/cmp, monotonic, equivalences, arity, types, keys, column
names, cardinality) were added when the variant was introduced because those
matches are exhaustive; only NonNegative's catch-all let the leaf slip through.
Add a regression test running the analysis on `Project(Changes)`.
https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
…etation Executing a one-off `CHANGES` peek panicked at runtime in the persist source (linear.rs, MfpPlan projection): "index out of bounds" while projecting the changelog read's MFP over the raw source rows. Rendering reads the raw source via `persist_source` and only *then* reinterprets each `(row, time, diff)` into the extended changelog row `(input.., mz_timestamp, mz_diff)` (render.rs, gated on `read_as_changelog`). But two optimizer passes push the surrounding MFP down into the source operators, where `persist_source` applies it *before* the reinterpretation — so a projection referencing the appended `mz_timestamp`/`mz_diff` columns (e.g. `SELECT a, mz_diff FROM CHANGES(t)` -> projection [0,2]) indexes past the raw, arity-1 row. Skip both pushdowns for source imports flagged `read_as_changelog`, leaving the MFP on the `Get` where rendering applies it to the reinterpreted, extended collection: - `Plan::refine_source_mfps` (LIR finalize) — the projection pushdown that caused the panic. - `optimize_dataflow_filters` (MIR) — the analogous filter pushdown; not hit by the current tests (no `WHERE` on `CHANGES`) but the same hazard. Also exclude changes.slt from --auto-index-selects in the *fast* SLT config (compileFastSltConfig's tests_without_views); a prior commit only covered the slow config, so the fast config still wrapped CHANGES SELECTs in indexed views (rejected in a maintained context) and reported InconsistentViewOutcome. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The slt and design doc claimed a subquery `(SELECT * FROM v)` transparently
reads through a non-materialized view to its underlying collection. It does not:
a view is not inlined at this planning stage, so the subquery reduces to a bare
`Get` of the view itself, which `plan_changes_input` then rejects ("v is a
view") — the same as the name form. Reading through to an underlying shard is
the deferred arbitrary-expression case (and only an identity view would even
qualify). Align the test expectation and the doc with that behavior.
https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Adds a deterministic look-back: a table with a generous RETAIN HISTORY window keeps `since` pinned far behind `upper`, so a strict `AS OF <past>` bound reads the full historical changelog as written — neither clamped up to `since` (as an advisory `AS OF AT LEAST` bound is) nor rejected (as a strict bound is once compaction advances past it on default retention). Covers a snapshot collapse plus inserts, a deletion, and an UPDATE (retract + insert) after the bound, and checks the per-key net over the window. This is the strict-vs-advisory case the existing tests did not exercise: prior look-backs raced compaction on a default-retention table. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Replace the lagging-read-policy sketch with the worked design: compute controller lags changelog imports' dependency holds behind the output frontier, rendering skips the snapshot and reads the input from as_of - i, and as-of selection gains a changelog-aware hard constraint. Restart then reproduces the window exactly; retract-and-re-emit becomes the advisory fallback. Records the durability/bootstrap chain and 0dt protections that make restart safe. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements the maintained path from the design doc: * The source import carries a ChangelogMode: one-off reads stay at the dataflow as_of with the snapshot included; maintained reads exclude the snapshot and replay deltas from a coordinator-resolved start (join(since, as_of - window)), computed at creation (sequencer), bootstrap (as-of selection), and command-history reduction (replica reconnect), so replicas always see a deterministic, readable start. * The compute controller lags the dependency read holds of changelog imports behind the output read frontier by the window, so a restarted dataflow can re-read its window and reproduce the persisted output exactly; as-of selection gains a matching soft lower-bound constraint (as_of >= since + window) that degrades to the advisory fallback when the since has slipped. * Rendering reads maintained changelog imports below the dataflow as_of with SnapshotMode::Exclude and advances emitted (and error) times to the as_of, keeping the true update time in the mz_timestamp column. * The MV optimizer extracts the constant window lag from the bound (rejecting calendar-dependent month intervals and REFRESH schedules) and wraps each changelog read in the temporal filter mz_now() < mz_timestamp + i, which retracts changes at the window's trailing edge and bounds the MV's state. * mz_timestamp deliberately supports no arithmetic, so the sliding bound mz_now() - <interval> is blessed as special syntax of the AS OF clause, planning as (mz_now()::timestamptz - <interval>)::mz_timestamp with existing casts; it never becomes a dataflow predicate. Strict sliding bounds, indexes, and other maintained contexts remain gated. Restart coverage is future work. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The maintained materialized-view path is implemented; replace the deferral note with a map of where each piece lives and a detailed remaining-work list (restart verification, zero-slack mitigation, window cap feature flag, month intervals, indexes, strict sliding bounds, SUBSCRIBE, hold introspection) so the work can be resumed. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
End-to-end verification of the restart-exact-reproduction claim: a maintained sliding-window CHANGES materialized view must survive restarts and upgrades with its window contents intact. Exact row multisets catch spurious snapshot rows and replayed changes; cross-phase timestamp ordering catches a restart that re-snapshots the window instead of resuming the changelog. Each materialized view is created over a freshly created, empty table with all DML after creation, so expected contents are deterministic (creating a changelog view over a non-empty table races the input's since for which changes land in the initial snapshot). Verified locally with --scenario=NoRestartNoUpgrade and --scenario=RestartEntireMz. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The window directly sizes the compaction hold a maintained CHANGES materialized view imposes on its input, and nothing bounded it: a mz_now() - INTERVAL '10 years' bound would pin ten years of history. Cap it with a new system variable, changes_max_window (default 1 day), so wider windows are an explicit opt-in. Enforced in the sequencer at CREATE MATERIALIZED VIEW rather than in the optimizer's changelog_window pass: the optimizer also runs at bootstrap, where erroring would wedge the system if the cap were lowered below an existing view's window. Creation-time-only enforcement grandfathers existing objects. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Only bare-read subqueries are supported; a subquery that filters or transforms reopens the time-invariance problem deferred in Alternatives. Record it in the remaining-work list with the current workaround (materialize the derived query, then CHANGES over it). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Changes is deliberately an opaque barrier leaf today. Record the sound relaxations (demand/projection pushdown, commuting non-temporal predicate pushdown, precise analysis values, statistics) and their role as groundwork for arbitrary expressions. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Reference page for the CHANGES table function: syntax, output columns, strict vs advisory bounds, history/compaction caveats, the allowed contexts, the changes_max_window cap, and worked examples. Marked private preview, matching the feature flag. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review findings on the one-off path. The peek optimizer pinned the dataflow as_of down to min(changelog starts), which broke two things: mz_now() elsewhere in the query body folded to the changelog start rather than the query time, and multiple CHANGES with distinct bounds collapsed to the earliest start. Give ChangelogMode::OneShot a per-import resolved start, mirroring Maintained: rendering reads each changelog import at its own start (snapshot included, collapsing history there) and the reinterpretation advances differential times to the as_of, which now stays the query time. The controller installs the import's read hold at the start, so a strict bound below the input's since still fails dataflow creation. Multiple one-off reads of the *same* input share the earliest start (one import, one snapshot collapse) — recorded as a sharp edge. Also from the review: generalize the fixed-bound gating error message, correct the SUBSCRIBE output-shape equivalence claims (columns trail rather than lead; mz_timestamp is mz_timestamp, not numeric; mz_diff is non-null), fix the pack_changelog_row doc comment for the maintained path, record further sharp edges (subquery alias dropped, duplicate mz_timestamp/mz_diff column names, query-wide since clamp), and add tests: mz_now() in the body of a CHANGES query, two CHANGES with distinct bounds in one query, EXPLAIN output, and a zero-lag sliding bound. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A one-off CHANGES with a strict AS OF bound below the input's since
panicked instead of erroring, in several ways. Reported by Moritz on
the materialized-view repro `CHANGES(mv AS OF mz_now() - '1' day)`.
Three fixes:
* The peek optimizer rejects a strict bound below `since` with a
dedicated error (ChangesHistoryUnavailable) naming the bound and the
earliest available history, plus a hint suggesting AS OF AT LEAST or
RETAIN HISTORY. Previously this surfaced as a raw internal dataflow-
creation error at best.
* The compute controller validates changelog import starts against the
import read holds in the validation phase of create_dataflow, as a
backstop, instead of failing the instance-side hold downgrade that
is expected to be infallible ("validated" panic).
* implement_slow_path_peek defuses its ExecuteContextGuard when
implement_peek_plan returns an error: the frontend logs the
error-ended execution, and the guard's auto-retire would end the
statement a second time, panicking statement logging. This was
reachable before this branch via any dataflow-creation error on the
frontend slow path (the old code panicked earlier, in the frontend
peek read-holds assertion, for the CHANGES repro).
Regression tests pin both strict-below-since forms (fixed and
sliding) on the new error message.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The frontend peek path blanket-wraps optimizer errors as internal errors, on the assumption that the peek optimizer only fails on bugs. ChangesHistoryUnavailable is a user mistake (a strict AS OF bound below the input's earliest retained history); pass it through with its error code and hint instead of prefixing it with "internal error in optimizer". Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Read-hold attribution is not specific to CHANGES; track it as a general introspection ask. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A strict (AS OF) sliding bound requires a full retained window instead
of silently aging in like the advisory (AS OF AT LEAST) form. The
naturally selected MV as_of is the least valid read — the inputs'
since — at which the window can never be full; the sequencer advances
the dataflow and storage as_of to since + strict_window, making the
window exactly the retained history, as long as that stays within the
greatest available read. Beyond it the input genuinely lacks a full
window and creation errors with ChangesHistoryUnavailable.
Strictness travels as ChangelogMode::Maintained { strict_window }, the
widest window among the strict reads of an import (possibly narrower
than the hold-sizing window when strict and advisory reads mix).
Enforcement is creation-time only: restarts resolve the start
advisorily regardless — erroring an existing view at bootstrap would
wedge the system — while the lagged dependency holds reproduce the
window exactly across restarts anyway.
Two fixes uncovered along the way, affecting the advisory form too:
changelog start resolution now reads the inputs' collection since via
freshly acquired holds — the transaction read holds reused for
timestamp selection (frontend sequencing) sit at the query timestamp,
which hid retained history and over-clamped advisory starts; and an
advisory view over an input with RETAIN HISTORY now serves the
retained part of its window immediately instead of aging in from the
creation time (tested in changes.td).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Non-temporal predicates on a changelog read's input columns commute with the changelog reinterpretation, so optimize_dataflow_filters now pushes them into the changelog source import's operators instead of skipping changelog imports wholesale. persist_source applies them before the reinterpretation, enabling part-stats pruning over retained history. PredicatePushdown reports predicates above a Changes leaf under the read collection's id, mirroring Get (including the bare-use clear, so only predicates common to every read are pushed). The import loop retains only predicates whose support lies within the raw source arity and that are non-temporal; predicates on the appended mz_timestamp/mz_diff columns stay above the leaf. refine_source_mfps keeps its changelog guard, since LIR Get MFPs are on the extended schema. Tests: EXPLAIN pushdown shapes in changes.slt (pushed, not pushable, blocked by an unfiltered sibling read, common across distinct bounds); data correctness for pushed and non-pushable filters in one-off and maintained reads in changes.td. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A dataflow's source imports are keyed by GlobalId and carry one read mode, so a query reading t plainly and via CHANGES(t ...) shared one import: rendering reinterpreted it as a changelog and the plain read consumed packed changelog rows. Join shapes happened to survive via arrangement thinning; scalar subqueries returned the changelog count for the plain read. Detect the combination at both optimizer sites that already walk the MIR for Changes nodes (peek and materialized view) and error with a hint. The detection is access-path independent: an index-mediated direct read binds the same id in the render context. The restriction is not information-theoretic: the raw import stream read at the earliest start determines both the direct read (advance times to as_of) and each changelog (advance to its start, consolidate, pack). Lifting it means moving the reinterpretation from the import site to the consumer side, which would also remove the shared-earliest-start restriction for one-off reads; recorded in the design doc. Tests: mixed reads error (direct, scalar subqueries, through an inlined view, MV creation); controls for two CHANGES of one input and a direct read of a different collection. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…yses The Changes node only ever emits diff +1 and retracts nothing itself; in the maintained case rows age out through the temporal filter above the node, which both analyses already treat correctly (temporal predicates reset monotonicity). Flip the conservative false to true in both, so aggregations over a changelog render monotonically (e.g. hierarchical min/max without retraction support). The pre-pack consolidation is unrelated to monotonicity and stays: it is value-defining (it nets the mz_diff column per (row, time) and collapses the snapshot) and cheap (consolidate_pact holds only in-flight updates, no retained arrangement); recorded in the design doc, along with what a per-consumer reinterpretation of a raw changelog import entails. Tests: EXPLAIN PHYSICAL shows Reduce::Hierarchical monotonic over a changelog in changes.slt; min/max data round-trip in changes.td; the NonNegative leaf regression test asserts the new value. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Distill the design discussion: deltas vs accumulations (negative mz_diff iff post-start retraction; reify time into data last), the algebraic definition of the surfaced form as count(*) BY cols, mz_timestamp over a diff-as-multiplicity Z-set, the three physical profiles for that count (reified-time reduce: unbounded, never lower to it; frontier-sealed netting: today's consolidate_pact; fused diff-linear aggregate: the introspection mz_message_counts pattern, state ~#keys), arranging the raw stream to serve direct readers and changelog consumers from one bounded trace, and the preference for idiomatic differential composition (SemigroupVariable/negate/concat/arrange, as in the monotonic TopK retraction loop and the self-correcting persist sink) over bespoke operators for optimizer re-use. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A changelog import now binds the *raw* stream, read from the import-level start; each consumer derives its own view at its Get site: * A changelog read (new GetPlan::Changelog) advances the raw event times to its own resolved start (collapsing its own snapshot), nets per (row, time) via consolidate_pact — the arrangement batcher without a retained trace — packs mz_timestamp/mz_diff, and advances emission to the as_of. The one-off start travels as resolved_start on the MIR Changes node, filled at timestamp resolution; the bound stays as written, keeping EXPLAIN deterministic. Maintained reads carry no per-read start and drop times at or below the import start. * A direct read of the same import advances the raw event times to the as_of, yielding the input's contents — mixed direct + CHANGES reads of one collection become legal (the ChangesMixedRead rejection is removed), and multiple one-off reads of one input collapse their own snapshots at their own starts (previously shared the earliest). * A maintained import includes the snapshot when a direct read shares it (ChangelogMode::Maintained.snapshot_for_direct_reads); the maintained changelog reads' unconditional drop of times at or below start makes that safe, and the all-changelog snapshot-fetch skip is preserved. Start resolution, holds, and controller validation stay import-level (the minimum start). See the design doc's per-consumer bullet for the prototype deviations from the LirId-keyed spec design. Tests: mixed-read slt rejections become data tests; testdrive covers the original mixed-read repro (1 3), same-input two-bounds per-read snapshot collapse, a strict look-back mixed MV (direct side sees the snapshot), and an advisory mixed-join MV; ChangesMaterializedView platform check passes RestartEntireMz; existing changes.td and changes.slt unchanged otherwise. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Extend the changelog-algebra section with three design outcomes: the next step of declaring the netting as an ordinary count(*) reduce over a ChangesRaw Z-set leaf (with the consolidate_pact rendering as the time-in-key lowering pattern, the fusion case becoming stock reduce algebra, and the analysis/escape-guard costs); the conditions under which the snapshot fetch can be skipped (maintained by construction, one-off via predicate pushdown into SnapshotMode, SUBSCRIBE's explicit spelling); and the two sharings across reads of one input (raw arrangement for the netting across starts, SemigroupVariable-bounded windowed packed arrangement across maintained readers — the prerequisite shape for indexes on changelogs). Mark per-consumer reinterpretation as merged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Rebase fallout: consolidate_pact grew an explicit chunker type parameter upstream. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2590c35 to
1f6cbb8
Compare
…lta tables - INTEGRATE becomes a read operator (dual of CHANGES) usable in plain MVs, not an object kind; it is the typing boundary (mz_timestamp/mz_diff are data inside its argument, gone in its result). Fix the dedup example accordingly, and note the timestamp-as-data stability pitfall (subject to the compaction clamp). - DELTA TABLEs are mutable: bounding is ordinary DELETE/UPDATE DML, defined as consolidating (retract at original mz_timestamp), distinct from forward age-out. The standing OCC pruner is deferred, keeping v1 bounding off the OCC critical path. - The only new standing object is the RECORD writer; cadence is frontier-driven (COMMIT EVERY rejected as an anti-pattern). - DELTA TABLE domain is inherited from the first RECORD writer by default, with IN DOMAIN as escape hatch; multiple writers per table are sound because the table-owned reclock recovers the merged A->B mapping; domain bound-once/immutable. - Record data-domain compaction as a deferred future capability. - CHANGES is an open PR (#36869), not shipped; keep CHANGES (not DIFFERENTIATE). - Update the implementation companion to match (architecture, change map, H1/H3, phases, open questions). https://claude.ai/code/session_015YFH7J7PaEqkBSrAQYaq8H
…RECORD wrapper From an agent review against Materialize's grammar (file:line) and prior art: - Carriers use a USING TIME/DIFF clause, not => named args: MZ has no named-arg support and => already lexes as map-entry; USING reuses a reserved keyword at near-zero parser cost. (=> is the cross-system convention; recorded as a possible future revisit, co-designed with #36869.) - CHANGES's AS OF parses like SUBSCRIBE (query) AS OF … — attaches to the operator, not an inner SELECT (resolves the 'AS OF only at outermost SELECT' concern). - Drop the AS RECORD(...) wrapper: CREATE RECORDER … INTO <table> AS <query> binds like MV's AS <query>; INTO matches CREATE SINK … INTO. Also dissolves the RECORD- verb vs composite-type 'record' overlap. - Reclock surfaced via EXPOSE RECLOCK AS <name> (the CREATE SOURCE … EXPOSE PROGRESS AS precedent), engine-written / user-read-only. - Replace IN DOMAIN with WITH (TIMELINE = …) — IN DOMAIN would overload IN and foreclose standard CREATE DOMAIN. - Document deliberate divergences: DIFF is a signed multiplicity (not an op-enum like Snowflake METADATA$ACTION / Flink row-kinds / Debezium c,u,d); CHANGES collides with Snowflake's CHANGES clause (kept knowingly). - Confirmed idiomatic as-is: INTO, RETAIN HISTORY FOR, WITH (opt = val), mz_now(). - Propagated through both docs (operations table, surface, reclock decision, open questions, alternatives; impl change-map + Phase 2). https://claude.ai/code/session_015YFH7J7PaEqkBSrAQYaq8H
…rg-free INTEGRATE Aligns the docs with the canonical Notion 'Recorders' page (same model; surface/ controller details reconciled): - Carriers + progress declared on the TABLE: CREATE TABLE … WITH (PROGRESS, TIMESTAMP = …, DIFF = …), initialized to (0,0) (empty progress = sealed frontier). - INTEGRATE(table) is argument-free — carriers and the progress collection come from the table; nothing to mismatch. (CHANGES carrier spelling still open / #36869.) - Standardize terminology: the object is the 'progress collection' (a source's progress precedent); 'reclock'/'reclocking' is the operation of mapping through it. - Controller obligations (new impl M5): writer registration = capability; progress- collection GC by tracking INTEGRATE consumers; RETAIN HISTORY sits on the INTEGRATE consumer/recorder, not the table (leaning INTEGRATE). - Drop-last-writer reframed as open: freeze vs seal (advance to empty/top frontier). - Framing: at-least-once application / at-most-once persistence. Kept the doc's sharper points that Notion lacks: data-domain compaction must be a clamp+GROUP BY/SUM reduce (a bare UPDATE is wrong), and the self-reference answer. https://claude.ai/code/session_015YFH7J7PaEqkBSrAQYaq8H
Motivation
Closes #4527
Users want to consume the change stream of a collection as a relation they can transform with SQL.
SUBSCRIBEalready exposes this stream, but only as a top-level streaming statement whose output flows directly to the client — it cannot be wrapped in aSELECT, aggregated, joined, or materialized.This PR implements the
CHANGEStable function, which reads a collection's changelog as a relation with the per-update timestamp and diff available as ordinarymz_timestamp/mz_diffcolumns.Syntax
<relation>is a collection named directly or a parenthesized subquery (mirroringSUBSCRIBE). A subquery must reduce to a bare read of a single persist-backed object (table, source, materialized view); anything that filters/transforms is the deferred arbitrary-expression case and is rejected. A view is rejected in both forms — it is not persist-backed, and a non-materialized view is not inlined at this planning stage (the subquery reduces to a bareGetof the view itself).AS OFvsAS OF AT LEAST— strict (error if the requested start is below the input'ssince) vs advisory (age in: clamp the bound up tosince). Reuses the existingAsOfgrammar; the parens around a subquery delimit it from this trailingAS OF.mz_now()-relative bound is a sliding window that trails the query time by its lag.The bound's shape transparently signals the semantics; the design deliberately reuses
AS OF(Materialize's logical-time read-hold meaning, distinct from SQL:2011), resolving the design's "parameter keyword" question.Where it is allowed
mz_now()-relative) boundSELECTas_of = query_time − lag; window[query_time − lag, query_time])sinceindefinitely)Implementation
src/sql-parser):TableFactor::Changes { relation: ChangesRelation, as_of: AsOf, .. }, whereChangesRelationisName | Query(mirrorsSubscribeRelation).AS OF [AT LEAST]clause required.src/sql/src/plan/):plan_changesresolves the relation (a name, or a subquery reduced to a bare globalGet) to a single persist-backed collection, classifies the bound (strict/advisory, fixed/sliding), gates by lifetime (a fixed bound is rejected in a maintained context), and builds the changelog schema (input columns +mz_timestamp+mz_diff). Feature-flagged byenable_changes_table_function.src/expr,src/sql/src/plan/):MirRelationExpr::Changes { id, typ, bound, strict }— an opaque leaf carrying the lower-bound scalar; lowers to aGetof the source import marked as a changelog read.src/adapter/src/optimize/peek.rs): evaluates each changelogboundat the query time (resolvingmz_now()), clamps an advisory (AS OF AT LEAST) bound up to the input'ssince, and marks each source importChangelogMode::OneShot { start }with its own resolved start. The dataflowas_ofstays the query time, somz_now()elsewhere in the query resolves to the query time; the controller installs each import's read hold at its start.src/compute/src/render.rs): the changelog import binds the raw stream, read from the import-level start, and each consumer reinterprets it at itsGetsite (GetPlan::Changelog): advance the raw event times to the read's own resolved start (collapsing its own snapshot), net per(row, time)viaconsolidate_pact(the arrangement batcher without a retained trace), pack each netted(row, time, diff)as the append(pack(row, time, diff), max(time, as_of), +1)— append-only, matchingSUBSCRIBE's diff format — then apply the MFP to the extended rows. A direct read of the same import advances the raw times to theas_of(yielding the input's contents), so a query may read a collection both directly and viaCHANGES, and multiple one-off reads of one input collapse their own snapshots at their own starts. A maintained import includes the snapshot only when a direct read shares it; the changelog reads drop times at or below the start themselves. The one-off start travels asresolved_starton the MIRChangesnode (the bound stays as written, keeping EXPLAIN deterministic). The changelogGet's LIR MFP stays on theGetand is not hoisted into the persist source operators (guard inPlan::refine_source_mfps).src/transform/):Changesis an opaque barrier leaf in every analysis; each analysis must give it an explicit value rather than reading a (nonexistent) input — e.g. theNonNegativeanalysis (whose catch-all otherwise underflows for a leaf at post-order index 0). One pushdown is wired up: non-temporal predicates on input columns commute with the changelog reinterpretation, sooptimize_dataflow_filterspushes them into the changelog source import (only when common to every read of the input), wherepersist_sourceapplies them before the reinterpretation — part-stats pruning over retained history. Predicates on the appendedmz_timestamp/mz_diffcolumns stay above the leaf.ChangelogMode::Maintained { window, start }; the MV optimizer extracts the constant window lag and wraps each changelog read in the temporal filtermz_now() < mz_timestamp + lag; the read start is resolved asjoin(since, as_of − window)at creation, environment restart (as_of_selection), and replica reconnect (command-history reduction); the compute controller lags the import's read hold by the window so a restart can reproduce the persisted contents exactly; rendering skips the snapshot and replays deltas from the start. The window is capped by the newchanges_max_windowsystem variable (default 1 day), enforced at creation only so lowering the cap cannot wedge existing objects at bootstrap. A strict sliding bound (AS OF, vsAS OF AT LEAST) requires a fully retained window: the sequencer advances the view'sas_oftosince + windowwhen the input retains that much history, and errors otherwise — never silently serving a partial window. Start resolution reads the inputs' collectionsincevia fresh read holds (the reused transaction holds sit at the query timestamp and hid retained history, over-clamping advisory starts too).Verification
test/sqllogictest/changes.slt: feature gating; argument validation (persist-backed only; views and CTEs rejected, by name and via subquery); bound validation (non-constant / out-of-range rejected); durable-lifetime gating; advisory clamp; slidingmz_now()plan + run; maintained-MV creation and rejection cases (strict sliding, month intervals, REFRESH); thechanges_max_windowcap; predicate-pushdown EXPLAIN shapes (pushed, not pushable, blocked by an unfiltered sibling read); mixed direct + CHANGES reads (direct, scalar subqueries, through an inlined view, MV creation).test/testdrive/changes.td: data round-trips — snapshot atas_ofplus later inserts / deletes / updates as appends with correctmz_diff; aggregation composition; sliding bound; subquery form;CHANGESover a materialized view; strictAS OFreading back into aRETAIN HISTORYwindow; a maintained sliding-window MV aging in and appending deltas; pushed and non-pushable filters in one-off and maintained reads; the mixed direct + changelog data round-trip; same-input reads with distinct bounds collapsing their own snapshots; mixed maintained views (strict look-back where the direct side needs the snapshot, and an advisory join).ChangesMaterializedViewplatform check (misc/python/materialize/checks/all_checks/changes.py): end-to-end restart-exact reproduction — exact row multisets catch spurious snapshot rows, cross-phasemz_timestampordering catches a re-snapshot — across the restart/upgrade scenarios, including 0dt. Verified locally withRestartEntireMz.pack_changelog_row,NonNegative-leaf, andas_of_selectionchangelog-constraint unit tests.Follows the design doc (
doc/developer/design/20260602_changes_table_function.md).Follow-ups (tracked in the design doc's "Remaining work")
mz_internalrelation).CHANGES— generalize the subquery form beyond a bare read (the time-invariance / optimizer-barrier work).https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M